package com.sui.bigdata.flink.sql.side;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * @Author: dongguosheng
 * @Date: 2019/9/5 13:45
 * @Review
 * @Email: guosheng_dong@sui.com
 */
public class SendMessageTOKafka {
        private static KafkaProducer<String, String> producer;
        public SendMessageTOKafka(){
            Properties props = new Properties();
//            props.put("bootstrap.servers", "10.201.7.115:9092,10.201.7.120:9092,10.201.7.121:9092");
            props.put("bootstrap.servers", "10.201.7.187:9093,10.201.7.188:9093");

            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //设置分区类,根据key进行数据分区
            producer = new KafkaProducer<String, String>(props);
        }

        public void produce(String TOPIC){
            for (int i = 0;i<1;i++){
//                String key = String.valueOf(i);
                int ranPhone = (int) (Math.random()*(5-0)+0);
                int ranPhonenext = (int) (Math.random()*(5-0)+0);
                int ranSuid = (int) (Math.random()*(5-0)+0);
                int ranName = (int) (Math.random()*(5-0)+0);
                List<String> suids=new ArrayList<>();
                suids.add("u_ssjdfnjkf");
                suids.add("u_ssjdduifhi");
                suids.add("u_ssjddthrdt");
                suids.add("u_ssjtdyueer");
                suids.add("u_ssgdghdfgf");
                suids.add("u_ssgdghdgdkj");
                List<String> names=new ArrayList<>();
                names.add("小猫咪");
                names.add("王大锤");
                names.add("杨过");
                names.add("小龙女");
                names.add("唐三");
                names.add("小舞");
                List<String> phones=new ArrayList<>();
                phones.add("185027");
                phones.add("135549");
                phones.add("131265");
                phones.add("175863");
                phones.add("150326");
                phones.add("186595");

                List<String> phonenext=new ArrayList<>();
                phonenext.add("56895");
                phonenext.add("46895");
                phonenext.add("26878");
                phonenext.add("16878");
                phonenext.add("36895");
                phonenext.add("96895");

//                String data = "{\n" +
//                        "\t\"suid\": \""+suids.get(ranSuid)+"\",\n" +
//                        "\t\"phone\": \""+phones.get(ranPhone)+phonenext.get(ranPhonenext)+"\",\n" +
//                        "\t\"name\": \""+names.get(ranName)+"\",\n" +
//                        "\t\"user_id\": \"254469501\",\n" +
//                        "\t\"ymd\": \"2019-02-19\",\n" +
//                        "\t\"create_time\": \"2019-02-19 15:14:32\",\n" +
//                        "\t\"source_table\": \"fdxxx\"\n" +
//                        "}";

                String data ="{\n" +
                        "\t\"suid\": \"u_ssj10000\",\n" +
                        "\t\"phone\": \"18820986135\",\n" +
                        "\t\"name\": \"王小锤\",\n" +
                        "\t\"user_id\": \"25344948\",\n" +
                        "\t\"ymd\": \"2019-02-19\",\n" +
                        "\t\"create_time\": \"2019-02-19 15:14:32\",\n" +
                        "\t\"source_table\": \"fdxxx\"\n" +
                        "}";
                String data1 ="{\n" +
                        "\t\"suid\": \"u_ssj10000\",\n" +
                        "\t\"phone\": \"18720906135\",\n" +
                        "\t\"name\": \"王小锤\",\n" +
                        "\t\"user_id\": \"264759144\",\n" +
                        "\t\"ymd\": \"2019-02-19\",\n" +
                        "\t\"create_time\": \"2019-02-19 15:14:32\",\n" +
                        "\t\"source_table\": \"fdxxx\"\n" +
                        "}";
                String data2 ="{\n" +
                        "\t\"suid\": \"u_ssj10000\",\n" +
                        "\t\"phone\": \"18720986136\",\n" +
                        "\t\"name\": \"王小锤\",\n" +
                        "\t\"user_id\": \"24655924\",\n" +
                        "\t\"ymd\": \"2019-02-19\",\n" +
                        "\t\"create_time\": \"2019-02-19 15:14:32\",\n" +
                        "\t\"source_table\": \"fdxxx\"\n" +
                        "}";
                String data3 ="{\n" +
                        "\t\"suid\": \"u_ssj10000\",\n" +
                        "\t\"phone\": \"18720984135\",\n" +
                        "\t\"name\": \"王小锤\",\n" +
                        "\t\"user_id\": \"254469501\",\n" +
                        "\t\"ymd\": \"2019-02-19\",\n" +
                        "\t\"create_time\": \"2019-02-19 15:14:32\",\n" +
                        "\t\"source_table\": \"fdxxx\"\n" +
                        "}";
                String data4 ="{\n" +
                        "\t\"suid\": \"u_ssj1\",\n" +
                        "\t\"phone\": \"18720986135\",\n" +
                        "\t\"name\": \"王小锤\",\n" +
                        "\t\"user_id\": \"264469990\",\n" +
                        "\t\"ymd\": \"2019-02-19\",\n" +
                        "\t\"create_time\": \"2019-02-19 15:14:32\",\n" +
                        "\t\"source_table\": \"fdxxx\"\n" +
                        "}";
                String data5 ="{\n" +
                        "\t\"suid\": \"u_ssj1\",\n" +
                        "\t\"phone\": \"18720986135\",\n" +
                        "\t\"name\": \"王小锤\",\n" +
                        "\t\"user_id\": \"254469501\",\n" +
                        "\t\"ymd\": \"2019-02-19\",\n" +
                        "\t\"create_time\": \"2019-02-19 15:14:32\",\n" +
                        "\t\"source_table\": \"fdxxx\"\n" +
                        "}";
                String data6 ="{\n" +
                        "\t\"suid\": \"u_ssj1\",\n" +
                        "\t\"phone\": \"18820986165\",\n" +
                        "\t\"name\": \"王小锤\",\n" +
                        "\t\"user_id\": \"254469501\",\n" +
                        "\t\"ymd\": \"2019-02-19\",\n" +
                        "\t\"create_time\": \"2019-02-19 15:14:32\",\n" +
                        "\t\"source_table\": \"fdxxx\"\n" +
                        "}";
                String data7 ="{\n" +
                        "\t\"suid\": \"u_ssj1\",\n" +
                        "\t\"phone\": \"15820686165\",\n" +
                        "\t\"name\": \"王小锤\",\n" +
                        "\t\"user_id\": \"254469501\",\n" +
                        "\t\"ymd\": \"2019-02-19\",\n" +
                        "\t\"create_time\": \"2019-02-19 15:14:32\",\n" +
                        "\t\"source_table\": \"fdxxx\"\n" +
                        "}";
                String data8 ="{\n" +
                        "\t\"suid\": \"u_ssj1\",\n" +
                        "\t\"phone\": \"18820286165\",\n" +
                        "\t\"name\": \"王小锤\",\n" +
                        "\t\"user_id\": \"254469501\",\n" +
                        "\t\"ymd\": \"2019-02-19\",\n" +
                        "\t\"create_time\": \"2019-02-19 15:14:32\",\n" +
                        "\t\"source_table\": \"fdxxx\"\n" +
                        "}";


                producer.send(new ProducerRecord<String, String>(TOPIC,data));
                producer.send(new ProducerRecord<String, String>(TOPIC,data1));
                producer.send(new ProducerRecord<String, String>(TOPIC,data2));
                producer.send(new ProducerRecord<String, String>(TOPIC,data3));
                producer.send(new ProducerRecord<String, String>(TOPIC,data4));
                producer.send(new ProducerRecord<String, String>(TOPIC,data5));
                producer.send(new ProducerRecord<String, String>(TOPIC,data6));
                producer.send(new ProducerRecord<String, String>(TOPIC,data7));
                producer.send(new ProducerRecord<String, String>(TOPIC,data8));
//                System.out.println(data);
            }
            producer.close();
        }

    public void produceP0(String TOPIC){
        String data ="{\"rowData\":\"{\\\"user_id\\\":\\\"zftest_userid_100000002\\\",\\\"account\\\":\\\"u_afuwgy6h01\\\"}\"}";
        String data1 ="{\"rowData\":\"{\\\"user_id\\\":\\\"zftest_userid_100000001\\\",\\\"account\\\":\\\"u_afuwgy6h01\\\"}\"}";
        String data2 ="{\"rowData\":\"{\\\"user_id\\\":\\\"236669146641\\\",\\\"account\\\":\\\"u_afuwgy6h01\\\"}\"}";

        producer.send(new ProducerRecord<String, String>(TOPIC,data));
        producer.send(new ProducerRecord<String, String>(TOPIC,data1));
        producer.send(new ProducerRecord<String, String>(TOPIC,data2));
        producer.close();

    }

        public static void main(String[] args) {
//            new SendMessageTOKafka().produce("flinkwindow_test");
            new SendMessageTOKafka().produceP0("rtsync.bdl_riskctrl_active_apply_info");
        }

}
